讓我們開始實作其他核心 API 吧!以下為完整程式:
import java.util.concurrent.*
object Par {
opaque type Par[A] = ExecutorService => Future[A]
private case class UnitFuture[A](get: A) extends Future[A]:
def isDone = true
def get(timeout: Long, units: TimeUnit) = get
def isCancelled = false
def cancel(evenIfRunning: Boolean): Boolean = false
def unit[A](a: A): Par[A] =
_ => UnitFuture(a)
extension[A] (pa: Par[A])
def run(s: ExecutorService): Future[A] = pa(s)
def map2[B, C](pb: Par[B])(f: (A, B) => C): Par[C] =
(es: ExecutorService) =>
val af = pa(es)
val bf = pb(es)
UnitFuture(f(af.get, bf.get))
def fork[A](a: => Par[A]): Par[A] =
(es: ExecutorService) =>
es.submit(new Callable[A] {
def call = a(es).get
})
def lazyUnit[A](a: => A): Par[A] = fork(unit(a))
}
首先 unit
是一個把一般的值轉為 Future,該 Future 不需要使用 ExecutorService 來運行,代表也不需要支持 cancel 功能,Future 的 get 也是直接回傳 unit 的入參,
也因為 Future 並不是純粹的 functional 介面,且它有依賴 side effect 做事,基於此,我們將 Future 類別擴充成新類別 UnitFuture, 讓 unit 的實作就是直接 new UnitFuture 即可;
在來我們用了 extension 來擴展 Par,把 run 和 map2 都放在下面,讓我們能直接以 Par 來呼叫 run 和 map2;
extension 的說明可看 Day 14
map2
也是在取得 Par[A] 和 Par[B] 的結果回傳 UnitFuture;
fork
我們先用一個最簡單和自然的方式來實作。
Exercise D17-1
使用 lazyUnit 實作 asyncF function,asyncF 能使用入參 f: A => B
把回傳值 high-order function 中的 A 轉成 Par[B]。
def asyncF[A, B](f: A => B): A => Par[B]
接下來我們看如何透過核心 API 來做各種組合操作,假設我們有 Par[List[Int]]
,然後我們想針對 Par 中的 List 做排序,
def sortPar(parList: Par[List[Int]]): Par[List[Int]]
首先我們當然可以 run Par,取得 List 結果後在排序,最後在用 unit 包裹起來,但我們應當避免調用 run,目前核心 API 中能 Par 值的 function 是 map2,所以我們應該用它來實現,
def sortPar(parList: Par[List[Int]]): Par[List[Int]] =
parList.map2(unit(()))((a, _) => a.sorted)
這裡可以看到 map2 的另一個參數不重要,所以我們用 unit(())
來產生個空 Par 給 map2,既然我們只在意一個參數,我們可以抽象這個概念提升成 map function(我將此 map function 放到 extension 下),
def map[B](f: A => B): Par[B] =
map2(unit(()))((a, _) => f(a))
現在我們的 sortPar 可以這樣實現。
def sortPar(parList: Par[List[Int]]): Par[List[Int]] =
parList.map(_.sorted)
我們可以使用 f: A => B
來平行化運行的轉換 list 中所有元素嗎?不像 map2 是合併 2 個 Par 成一個 Par,這裡需要合併 N 個 Par 成一個 Par,首先我們可以用剛剛的 asyncF 把 List[A]
轉成新的 List[Par[B]]
,但下一步呢?
def parMap[A, B](ps: List[A])(f: A => B): Par[List[B]] =
val fbs: List[Par[B]] = ps.map(asyncF(f))
???
看起來我們需要個 function 把 List[Par[B]]
轉成 Par[List[B]]
,
def sequence[A](pas: List[Par[A]]): Par[List[A]] =
pas.foldRight(unit(List.empty[A]))((pa, acc) => pa.map2(acc)(_ :: _))
sequence
使用了 foldRight 來和 map2 來合併 List 中的所有值,
foldRight 的說明可以看 Day 4。
現在我們可以完善 parMap 了,這裡我們多調用了 fork
來讓它使用新的邏輯執行緒運行,即使我們的入參是很大的 List,parMap 會立即回傳 Par[List[B]] 回來。
def parMap[A, B](ps: List[A])(f: A => B): Par[List[B]] = fork:
val fbs: List[Par[B]] = ps.map(asyncF(f))
sequence(fbs)
Exercise D17-2
(Hard) 實作 parFilter,它能平行化的過濾 List 中所有元素。
def parFilter[A](l: List[A])(f: A => Boolean): Par[List[A]]
Purely Function 的平行化 還沒完喔,明天繼續!